package com.example.aspect;

import com.example.annotations.RpcRetry;
import com.example.config.entity.RetryConfig;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.rabbitmq.client.impl.recovery.RetryResult;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

/**
 * 重试注解的切面
 *
 * @author lr
 * @create 2023/2/5
 */
@Aspect
@Component
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public class RpcRetryAspect {
    @Resource
    private RetryConfigContext retryConfigContext;

    @Pointcut("@annotation(com.coder.common.annotation.RpcRetry)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws NoMoreRetryException {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        RpcRetry rpcRetryBaseConfig = method.getAnnotation(RpcRetry.class);
        // 解析出上游的重试参数
        RetryStatus retryUpstreamParams = extractRetryUpstreamParams(rpcRetryBaseConfig);
        if(retryUpstreamParams.getDdl()<=0){
            log.error("The request has reached the deadline.");
            throw new BusinessException("The request has reached the deadline.");
        }
        // 生成重试过程的记录器
        RetryConfig retryConfig = buildRetryRecorder(retryUpstreamParams);
        if(rpcRetryBaseConfig.noMoreRetry()){
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            requestAttributes.getResponse().addHeader(RetryStatus.NO_MORE_RETRY_HEADER, String.valueOf(rpcRetryBaseConfig.noMoreRetry()));
        }
        // 当前配置的重试次数
        int retryTimes = rpcRetryBaseConfig.retryTimes();
        if(retryUpstreamParams.isRetryRequest()){
            retryTimes = 1;
        }
        RetryerBuilder<RetryResult> retryerBuilder = RetryerBuilder.newBuilder();
        Retryer<RetryResult> retryer = retryerBuilder
                .withStopStrategy(StopStrategies.stopAfterAttempt(retryTimes))
                .retryIfExceptionOfType(RetryForCaughtException.class)
                .retryIfRuntimeException()
                .withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS))
                .withRetryListener(new RetryListener() {
                    @Override
                    public <V> void onRetry(Attempt<V> attempt) {
                        retryConfig.setRetryTimes(attempt.getAttemptNumber());
                        retryConfigContext.set(retryConfig);
                    }
                })
                .build();
        AtomicReference<Object> ret = new AtomicReference<>();
        RetryResult retryResult = RetryResult.FAIL;
        try {
            retryResult = retryer.call(()->{
                // 检查下游传来的停止重试标志
                if(retryConfigContext.getRetryConfig().isNoMoreRetry()){
                    log.warn("Received no more retry flag. Stop retrying.");
                    return RetryResult.NO_MORE_RETRY;
                }
                try {
                    ret.set(joinPoint.proceed());
                } catch (Throwable throwable) {
                    log.error("Throwable: ",throwable);
                    throw new RetryForCaughtException(throwable);
                }
                return RetryResult.SUCCESS;
            });
        } catch (ExecutionException e) {
            log.error("Occur ExecutionException",e);
        } catch (RetryException e) {
            log.error("Occur RetryException",e);
        }
        if(retryResult==RetryResult.NO_MORE_RETRY){
            throw new NoMoreRetryException("Received no more retry flag. Stop retrying.");
        }
        return ret.get();
    }

    public RetryConfig buildRetryRecorder(RetryStatus retryUpstreamParams){
        RetryConfig retryConfig = new RetryConfig();
        retryConfig.setLastDdl(retryUpstreamParams.getDdl());
        retryConfig.setRetryRequest(retryUpstreamParams.isRetryRequest());
        retryConfig.setStartTime(System.currentTimeMillis());
        retryConfigContext.set(retryConfig);
        return retryConfig;
    }

    public RetryStatus extractRetryUpstreamParams(RpcRetry rpcRetry){
        RetryStatus retryStatus = new RetryStatus();
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        try {
            retryStatus.setRetryRequest(Boolean.parseBoolean(request.getHeader(RetryStatus.IS_RETRY_REQUEST_HEADER)));
        }catch (Exception e){
            retryStatus.setRetryRequest(false);
        }
        try {
            retryStatus.setDdl(Integer.parseInt(request.getHeader(RetryStatus.DDL_HEADER)));
        }catch (Exception e){
            retryStatus.setDdl(rpcRetry.ddl());
        }
        return retryStatus;
    }
}